若泽大数据 www.ruozedata.com

ruozedata


  • 主页

  • 归档

  • 分类

  • 标签

  • 发展历史

  • Suche

生产Spark Streaming 黑名单过滤案例

Veröffentlicht am 2019-03-08 | Bearbeitet am 2019-06-14 | in Spark Streaming | Aufrufe:

测试数据(通过Socket传入):

1
2
3
20180808,zs
20180808,ls
20180808,ww

黑名单列表(生产存在表):

1
2
zs
ls

思路

  1. 原始日志可以通过Streaming直接读取成一个DStream
  2. 名单通过RDD来模拟一份

    逻辑实现

  3. 将DStream转成以下格式(黑名单只有名字)

    (zs,(20180808,zs))(ls,(20180808,ls))(ww,( 20180808,ww))

  4. 将黑名单转成

    (zs, true)(ls, true)

  5. DStram与RDD进行LeftJoin(DStream能与RDD进行Join就是借用的transform算子)

具体代码实现及注释

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.soul.spark.Streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @author soulChun
* @create 2019-01-10-16:12
*/
object TransformApp {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("StatafulFunApp").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf,Seconds(10))
//构建黑名单
val blacks = List("zs", "ls")
//通过map操作将黑名单结构转换成(zs, true)(ls, true)
val blackRDD = ssc.sparkContext.parallelize(blacks).map(x => (x, true))
val lines = ssc.socketTextStream("localhost", 8769)
//lines (20180808,zs)
//lines 通过map.split(1)之后取得就是zs,然后加一个x就转成了(zs,(20180808,zs)).就可以和blackRDD进行Join了
val clicklog = lines.map(x => (x.split(",")(1), x)).transform(rdd => {
//Join之后数据结构就变成了(zs,[(20180808,zs),true]),过滤掉第二个元素中的第二个元素等于true的
rdd.leftOuterJoin(blackRDD).filter(x => x._2._2.getOrElse(false) != true)
//我们最后要输出的格式是(20180808,zs),所以取Join之后的第二个元素中的第一个元素
.map(x => x._2._1)
})
ssc.start()
ssc.awaitTermination()
}
}

最后输出:
enter description here

ruozedata WeChat Bezahlung
# spark # 高级 # sparkstreaming
刚出炉的3家大数据面试题(含高级),你会吗?
生产Spark Executor Dead快速剖析
  • Inhaltsverzeichnis
  • Übersicht

ruozedata

若泽数据优秀博客汇总
155 Artikel
31 Kategorien
74 schlagwörter
RSS
GitHub B站学习视频 腾讯课堂学习视频 官网
  1. 1. 思路
  2. 2. 逻辑实现
  3. 3. 具体代码实现及注释
|
若泽数据
|